package com.zxt.mq.consumer;


import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.FactoryBean;

import cn.hutool.core.util.StrUtil;
import lombok.Builder;
import lombok.Data;

/**
 * <p>
 * 动态消费者管理类
 * </p>
 *
 * @author zxt
 * @since 2022/7/22 11:50
 */
@Data
@Builder
public class DynamicConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
    private String            directExchange;
    private String            topicExchange;
    private String            fanoutExchange;
    private String            queue;
    private String            routingKey;
    private Boolean           autoDeleted;
    private Boolean           durable;
    private Boolean           autoAck;
    private ConnectionFactory connectionFactory;
    private RabbitAdmin       rabbitAdmin;
    private Integer           concurrentNum;
    private IDynamicConsumer  consumer;


    private Exchange buildExchange() {
        if (null != directExchange) {
            return new DirectExchange(directExchange);
        } else if (null != topicExchange) {
            return new TopicExchange(topicExchange);
        } else if (null != fanoutExchange) {
            return new FanoutExchange(fanoutExchange);
        } else {
            if (StrUtil.isEmpty(routingKey)) {
                throw new IllegalArgumentException("defaultExchange's routingKey should not be null!");
            }
            return new DirectExchange("");
        }
    }

    private Queue buildQueue() {
        if (StrUtil.isEmpty(queue)) {
            throw new IllegalArgumentException("queue name should not be null!");
        }

        return new Queue(queue, durable != null && durable, false, autoDeleted == null || autoDeleted);
    }


    private Binding bind(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
    }


    private void check() {
        if (null == rabbitAdmin || null == connectionFactory) {
            throw new IllegalArgumentException("rabbitAdmin and connectionFactory should not be null!");
        }
    }


    @Override
    public SimpleMessageListenerContainer getObject() {
        check();
        Queue queue = buildQueue();
        Exchange exchange = buildExchange();
        Binding binding = bind(queue, exchange);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(binding);

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue);
        container.setPrefetchCount(20);
        container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum);
        // 应答方式 none 默认值 默认消息已消费成功；manual 手动应答 ；auto 自动应答于none区别为 自动应答需根据消费时有无异常或那种异常确认是是否成功消费
        container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
        if (null != consumer) {
            container.setMessageListener(consumer);
        }
        return container;
    }

    @Override
    public Class<?> getObjectType() {
        return SimpleMessageListenerContainer.class;
    }
}
